以下參考課程 LLM Twin: Building Your Production-Ready AI Replica 撰寫
在資料處理領域,資料流處理和批量處理是兩種常見且關鍵的技術。隨著系統需要即時處理大量持續流入的資料,資料流處理相比批量處理,具有更低的延遲,使其特別適合需要快速反應的即時分析和系統監控場景。
在本篇文章中,我們將介紹如何使用 Bytewax 和 Qdrant 來實現資料流處理的架構:
本文將逐步展示如何從 RabbitMQ 隊列接收資料,經過多層處理後,將清理和嵌入的資料儲存到 Qdrant。我們將首先比較資料流處理與批量處理的異同,然後展示具體的資料處理架構。
特點 | 批處理 | 資料流處理 |
---|---|---|
處理方式 | 集中處理大批量數據 | 即時處理每一條數據 |
延遲 | 高 | 低 |
適用場景 | 非即時任務、批量報告 | 即時分析、監控 |
資源利用率 | 資源利用集中於處理時間點 | 資源利用平均且持續 |
可控性 | 易於管理大規模數據,適合預定計劃 | 適應動態、連續的數據流 |
在我們的系統中,資料來自 RabbitMQ 隊列,並且需要進行即時處理。因此,選擇資料流處理是最佳解決方案。資料在處理完畢後會被快速地存入 Qdrant 向量資料庫,以供即時檢索。這種架構可以大幅減少延遲,並確保系統能夠實時處理資料。
Bytewax 是一個靈活的資料流處理框架,允許我們通過 輸入(Input) 、 處理(Processing) 、 輸出(Output) 三個步驟來設計資料流管道。接下來將展示如何使用 Bytewax 實現從 RabbitMQ 到 Qdrant 的即時處理過程。
首先,我們從 RabbitMQ
隊列中讀取資料,將其引入 Bytewax
流程中。
flow = Dataflow("Streaming ingestion pipeline")
stream = op.input("input", flow, RabbitMQSource())
Dataflow()
:這行程式碼建立了一個資料流處理管道,並命名為 "Streaming ingestion pipeline"
。Dataflow
是 Bytewax
的核心架構,負責組織整個資料流的處理步驟。
op.input()
:這個函式會從定義的資料來源讀取資料。在這個例子中,我們指定了來自 RabbitMQ
的資料源 (RabbitMQSource())
,表示資料將從 RabbitMQ
消息隊列中進行讀取。
stream
:變數 stream
是資料流的起點,它接收到的資料會推送進資料處理管道,後續處理步驟會依次對這些資料進行清理、轉換等操作。
在此步驟中,資料流管道將接收到的原始資料逐步轉換為最終的嵌入向量。這個過程分為多層處理,包括消息的結構化處理、數據清理、分塊以及嵌入。
首先,我們需要將從 RabbitMQ
隊列中讀取的原始數據轉換為結構化的 Pydantic
模型,以便在後續的處理步驟中進行進一步操作。
stream = op.map("raw dispatch", stream, RawDispatcher.handle_mq_message)
op.map()
:map
是 Bytewax
中的一個操作,它會對每條資料進行處理並返回一個新的資料流。在這裡,我們將原始資料進行轉換。RawDispatcher.handle_mq_message
:這裡調用了 RawDispatcher
中的 handle_mq_message
方法,該方法將從 RabbitMQ
接收到的原始資料轉換為 Pydantic
模型,為後續的處理步驟做準備。接下來,我們對資料進行清理,移除無效字符、符號等不必要的內容,並保留對後續處理有用的部分。
stream = op.map("clean dispatch", stream, CleaningDispatcher.dispatch_cleaner)
CleaningDispatcher.dispatch_cleaner
:這裡調用了 CleaningDispatcher
中的 dispatch_cleaner
方法,該方法負責根據資料的類型,選擇合適的清理器來清理數據。對清理後的數據進行分塊,便於嵌入處理:
stream = op.flat_map("chunk dispatch", stream, ChunkingDispatcher.dispatch_chunker)
op.flat_map()
:將單個數據轉換為多個塊(chunk
),每個塊會作為獨立的資料進行後續處理。ChunkingDispatcher.dispatch_chunker
:這裡調用了 ChunkingDispatcher
中的 dispatch_chunker
方法,該方法負責將資料分塊,使每個塊都能獨立進行嵌入處理。最後,將分塊後的數據轉換為向量表示形式,這些向量將用於後續檢索或分析:
stream = op.map("embedded chunk dispatch", stream, EmbeddingDispatcher.dispatch_embedder)
EmbeddingDispatcher.dispatch_embedder
:這裡調用了 EmbeddingDispatcher
的 dispatch_embedder
方法,該方法負責將分塊後的數據轉換為嵌入向量。在資料處理完畢後,最後一步是將處理後的數據存儲到 Qdrant
向量資料庫。
op.output("cleaned data insert to qdrant", stream, QdrantOutput(connection=connection, sink_type="clean"))
op.output("embedded data insert to qdrant", stream, QdrantOutput(connection=connection, sink_type="vector"))
op.output()
:這是 Bytewax
用來將資料流輸出到外部系統的操作。這裡指定了數據的目標系統為 Qdrant
。stream
:資料流,這裡表示要處理的資料流是已經完成清理和嵌入的數據。QdrantOutput(connection=connection, sink_type="clean")
:這裡我們定義了輸出的數據類型和數據庫連接。sink_type="clean"
表示存入的是清理後的數據。QdrantOutput(connection=connection, sink_type="vector")
:這裡定義了嵌入後數據的輸出,將嵌入向量存儲到 Qdrant 的向量集合中。Qdrant
是一個專門用來存儲向量數據的資料庫,特別適合於像嵌入向量這樣的大規模高維數據。在資料流處理過程中,處理完的數據需要被高效地存儲到 Qdrant 向量資料庫中。為了實現這個過程,Bytewax 提供了兩個工具:DynamicSink
和 StatelessSinkPartition
。
DynamicSink
和 StatelessSinkPartition
DynamicSink
:這個工具允許我們根據資料的不同狀態或類型動態構建資料輸出管道,並將資料輸送到不同的目的地。它可以靈活地根據處理邏輯,選擇不同的接收器來存儲資料,這對於多種類型資料的處理系統尤為重要。
StatelessSinkPartition
:這個工具負責處理具體的資料寫入邏輯,將處理後的資料分批寫入資料庫。它是 DynamicSink
實際寫入資料的具體實現。在這裡,我們使用不同的 StatelessSinkPartition
類別來處理清理後和嵌入後的資料,並將它們分別存入 Qdrant
的不同集合。
以下展示如何使用 DynamicSink
和 StatelessSinkPartition
根據資料狀態選擇不同的接收器來處理資料:
from bytewax.outputs import DynamicSink, StatelessSinkPartition
from db.qdrant import QdrantDatabaseConnector
class QdrantOutput(DynamicSink):
def __init__(self, connection: QdrantDatabaseConnector, sink_type: str) -> None:
self._connection = connection
self._sink_type = sink_type
def build(self, worker_index: int, worker_count: int) -> StatelessSinkPartition:
if self._sink_type == "clean":
return QdrantCleanedDataSink(self._connection)
elif self._sink_type == "vector":
return QdrantVectorDataSink(self._connection)
QdrantOutput(DynamicSink)
:QdrantOutput
繼承了 DynamicSink
,並根據 sink_type
動態選擇要使用的接收器。sink_type
會根據資料是清理後的還是嵌入後的來選擇不同的寫入邏輯。build()
:build()
是一個動態方法,根據傳入的 sink_type
返回相應的資料接收器。例如,當 sink_type
為 "clean"
時,它會返回 QdrantCleanedDataSink
;當 sink_type
為 "vector"
時,則返回 QdrantVectorDataSink
。這段程式碼的作用是根據數據的不同狀態(清理後或嵌入後),自動選擇適當的接收器來將數據寫入 Qdrant
。
這一部分的邏輯負責將清理後的數據寫入到 Qdrant
的指定集合中。QdrantCleanedDataSink
是用來處理清理後資料的接收器,它使用 write_batch()
方法來將資料批量寫入資料庫。
class QdrantCleanedDataSink(StatelessSinkPartition):
def __init__(self, connection: QdrantDatabaseConnector):
self._client = connection
def write_batch(self, items: List[DBDataModel]) -> None:
payloads = [item.save() for item in items]
ids, data = zip(*payloads)
collection_name = get_clean_collection(...)
self._client.write_data(
collection_name=collection_name,
points=Batch(ids=ids, vectors={}, payloads=data),
)
QdrantCleanedDataSink(StatelessSinkPartition)
:這是處理清理後資料的接收器,它繼承了 StatelessSinkPartition
,負責具體的資料寫入邏輯。write_batch()
:這個方法會接收批量的資料(items
),並將其保存到指定的 Qdrant
集合中。每一個資料會先進行序列化(item.save()
),並提取出需要儲存的 ids
和 data
,然後調用 self._client.write_data()
將它們寫入到對應的集合中。與清理後資料的邏輯相似,嵌入後的數據會被存儲到向量集合中。QdrantVectorDataSink
負責處理這些嵌入後的向量數據,並將它們存入 Qdrant
。
class QdrantVectorDataSink(StatelessSinkPartition):
def __init__(self, connection: QdrantDatabaseConnector):
self._client = connection
def write_batch(self, items: List[DBDataModel]) -> None:
payloads = [item.save() for item in items]
ids, vectors, metadata = zip(*payloads)
collection_name = get_vector_collection(...)
self._client.write_data(
collection_name=collection_name,
points=Batch(ids=ids, vectors=vectors, payloads=metadata),
)
QdrantVectorDataSink(StatelessSinkPartition)
:這個接收器專門用來處理嵌入後的向量資料,它會將嵌入的向量和相關的 metadata
批量寫入 Qdrant
的向量集合中。write_batch()
:與清理後數據的邏輯類似,這個方法會接收批量的嵌入後資料據(items
),並通過 item.save()
來序列化每條資料。然後將 ids
、vectors
和 metadata
提取出來,最終寫入到 Qdrant
中的指定向量集合。ref.